1. 组件简介
在复杂的分布式系统中,延迟任务(如订单超时自动取消、预约提醒)是必不可少的场景。虽然 RabbitMQ、RocketMQ 等消息中间件提供了延迟消息功能,但在某些轻量级或特定场景下,基于 Redis 实现延迟队列更为灵活。
然而,传统的基于 Redis(ZSet 或 List)的延迟队列在高并发场景下容易遇到性能瓶颈:
- 热点 Key 问题:所有消息写入同一个 Key,导致单节点压力过大。
- 消费瓶颈:单个消费者线程无法处理海量积压消息。
本组件基于 Redisson 客户端封装,引入了分片(Sharding)机制,将一个逻辑队列拆分为多个物理队列,实现了生产端的负载均衡与消费端的并行处理,并通过 Spring Boot 自动配置实现了开箱即用。
2. 核心架构设计
2.1 底层实现
组件底层强依赖 Redisson 的两个核心对象:
RDelayedQueue:利用 Redis 的zset实现消息的延迟存储与定时触发(将到期消息移动到阻塞队列)。RBlockingQueue:作为目标队列,消费者通过BLPOP阻塞式获取就绪消息。
2.2 分片(Sharding)设计模型
这是本组件的核心亮点。为了突破单队列性能极限,我们采用了 逻辑 Topic -> 物理 Topic 的映射机制。
- 逻辑层:业务方只关注一个 Topic,例如
order_cancel。 - 物理层:框架根据配置的
isolation-region-count(分片数),自动将其拆分为order_cancel-0,order_cancel-1...order_cancel-N。 - 生产者:通过**轮询(Round-Robin)**算法将消息均匀分散到各个物理分片。
- 消费者:框架自动启动对应数量的监听线程,一对一监听物理分片,实现并行消费。
3. 核心模块与源码解析
组件主要由以下四个模块构成:
| 模块 | 关键类 | 核心职责 |
|---|---|---|
| Config | DelayQueueAutoConfig | 负责 Spring Boot 自动装配,加载 DelayQueueProperties 配置。 |
| Context | DelayQueueContext | 生产者的统一入口,负责管理 Topic 与分片组合的映射关系。 |
| Core | IsolationRegionSelector | 分片选择器,利用 AtomicInteger 实现轮询负载均衡。 |
| Core | DelayConsumerQueue | 消费者核心,内部维护监听线程池(阻塞读)和执行线程池(异步运行)。 |
| Event | DelayQueueInitHandler | 启动引导器,监听 Spring 上下文启动事件,扫描并初始化消费者。 |
3.1 关键交互流程
graph TD
subgraph Producer [生产者端]
API[DelayQueueContext.sendMessage] --> Combine[DelayQueueProduceCombine]
Combine --> Selector[IsolationRegionSelector]
Selector -- 轮询算法 --> Q0[Queue-0]
Selector -- 轮询算法 --> Q1[Queue-1]
Selector -- 轮询算法 --> Q2[Queue-2]
end
subgraph Redis [Redisson Server]
Q0 -.-> R0[(Redis List 0)]
Q1 -.-> R1[(Redis List 1)]
Q2 -.-> R2[(Redis List 2)]
end
subgraph Consumer [消费者端]
Init[DelayQueueInitHandler] -- 扫描并启动 --> Threads
R0 --> C0[DelayConsumerQueue-0]
R1 --> C1[DelayConsumerQueue-1]
R2 --> C2[DelayConsumerQueue-2]
C0 --> Task[ConsumerTask 业务逻辑]
C1 --> Task
C2 --> Task
end4. 项目实战指南 (Damai Project)
以 “订单超时未支付自动取消” 场景为例,演示如何在项目中使用该组件。
4.1 第一步:引入依赖与配置
在 application.yml 中配置分片数量和线程池参数:
delay:
queue:
isolation-region-count: 5 # 核心参数:将队列拆分为 5 个分片
core-pool-size: 4 # 消费者执行业务逻辑的线程池核心大小4.2 第二步:定义消费者 (Consumer)
业务方只需实现 ConsumerTask 接口,定义 Topic 名称和具体业务逻辑。框架会自动发现该 Bean 并启动监听。
@Component
public class DelayOrderCancelConsumer implements ConsumerTask {
@Autowired
private OrderService orderService;
@Override
public void execute(String content) {
// content 为 JSON 字符串,需反序列化
DelayOrderCancelMessageModule message = JSON.parseObject(content, DelayOrderCancelMessageModule.class);
// 执行核心业务:取消订单
orderService.cancel(message.getOrderNumber());
System.out.println("订单自动取消执行完毕: " + message.getOrderNumber());
}
@Override
public String topic() {
// 定义逻辑 Topic,框架会自动将其扩展为 topic-0, topic-1...
return "delay_order_cancel_topic";
}
}原理注脚:应用启动时,
DelayQueueInitHandler会扫描到这个 Bean,根据isolation-region-count: 5的配置,自动创建 5 个DelayConsumerQueue实例,分别监听delay_order_cancel_topic-0到delay_order_cancel_topic-4。
4.3 第三步:发送延迟消息 (Producer)
注入 DelayQueueContext 统一入口,指定 Topic、消息内容和延迟时间。
@Component
public class DelayOrderCancelSend {
@Autowired
private DelayQueueContext delayQueueContext;
public void sendMessage(DelayOrderCancelDto dto) {
String content = JSON.toJSONString(dto);
// 发送消息
// 参数:Topic, 消息内容, 延迟数值, 时间单位
delayQueueContext.sendMessage(
"delay_order_cancel_topic",
content,
15,
TimeUnit.MINUTES
);
System.out.println("延迟取消订单任务已发送,订单号:" + dto.getOrderNumber());
}
}原理注脚:
DelayQueueContext内部会调用IsolationRegionSelector进行轮询,假设当前轮询到索引 2,消息将被写入 Redis 的delay_order_cancel_topic-2队列中。
5. 总结
该组件通过简单的配置和接口封装,解决了传统 Redis 延迟队列的扩展性问题:
- 高吞吐:分片机制打散了 Key 的读写压力,支持水平扩展。
- 易集成:业务方只需关注
ConsumerTask接口,无需关心底层的 Redisson 操作和线程管理。 - 高并发:独立的监听线程池与执行线程池模型,确保了消息消费的及时性。